package com.itopener.tools.log.appender.kafka.logback;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.core.io.ClassPathResource;

import com.itopener.tools.log.appender.kafka.logback.utils.KafkaLogbackUtil;

/**  
 * @author fuwei.deng
 * @Date 2017年6月9日 下午3:10:58
 * @version 1.0.0
 */
public class Producer<E> {

	private Ssl<E> ssl = new Ssl<E>();

	/**
	 * Number of acknowledgments the producer requires the leader to have received
	 * before considering a request complete.
	 */
	private String acks;

	/**
	 * Number of records to batch before sending.
	 */
	private Integer batchSize;

	/**
	 * Comma-delimited list of host:port pairs to use for establishing the initial
	 * connection to the Kafka cluster.
	 */
	private String bootstrapServers;

	/**
	 * Total bytes of memory the producer can use to buffer records waiting to be sent
	 * to the server.
	 */
	private Long bufferMemory;

	/**
	 * Id to pass to the server when making requests; used for server-side logging.
	 */
	private String clientId;

	/**
	 * Compression type for all data generated by the producer.
	 */
	private String compressionType;

	/**
	 * Serializer class for keys.
	 */
	private Class<?> keySerializer = StringSerializer.class;

	/**
	 * Serializer class for values.
	 */
	private Class<?> valueSerializer = StringSerializer.class;

	/**
	 * When greater than zero, enables retrying of failed sends.
	 */
	private Integer retries;

	public Map<String, Object> buildProperties() {
		Map<String, Object> properties = new HashMap<String, Object>();
		if (this.acks != null) {
			properties.put(ProducerConfig.ACKS_CONFIG, this.acks);
		}
		if (this.batchSize != null) {
			properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize);
		}
		if (this.bootstrapServers != null) {
			properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, new ArrayList<String>(Collections.singletonList(this.bootstrapServers)));
		}
		if (this.bufferMemory != null) {
			properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory);
		}
		if (this.clientId != null) {
			properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId);
		}
		if (this.compressionType != null) {
			properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.compressionType);
		}
		if (this.keySerializer != null) {
			properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.keySerializer);
		}
		if (this.retries != null) {
			properties.put(ProducerConfig.RETRIES_CONFIG, this.retries);
		}
		if (this.ssl.getKeyPassword() != null) {
			properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
		}
		if (this.ssl.getKeystoreLocation() != null) {
			properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KafkaLogbackUtil.resourceToPath(new ClassPathResource(this.ssl.getKeystoreLocation())));
		}
		if (this.ssl.getKeystorePassword() != null) {
			properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword());
		}
		if (this.ssl.getTruststoreLocation() != null) {
			properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaLogbackUtil.resourceToPath(new ClassPathResource(this.ssl.getTruststoreLocation())));
		}
		if (this.ssl.getTruststorePassword() != null) {
			properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword());
		}
		if (this.valueSerializer != null) {
			properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer);
		}
		return properties;
	}

	public Ssl<E> getSsl() {
		return ssl;
	}

	public void setSsl(Ssl<E> ssl) {
		this.ssl = ssl;
	}

	public String getAcks() {
		return acks;
	}

	public void setAcks(String acks) {
		this.acks = acks;
	}

	public Integer getBatchSize() {
		return batchSize;
	}

	public void setBatchSize(Integer batchSize) {
		this.batchSize = batchSize;
	}

	public String getBootstrapServers() {
		return bootstrapServers;
	}

	public void setBootstrapServers(String bootstrapServers) {
		this.bootstrapServers = bootstrapServers;
	}

	public Long getBufferMemory() {
		return bufferMemory;
	}

	public void setBufferMemory(Long bufferMemory) {
		this.bufferMemory = bufferMemory;
	}

	public String getClientId() {
		return clientId;
	}

	public void setClientId(String clientId) {
		this.clientId = clientId;
	}

	public String getCompressionType() {
		return compressionType;
	}

	public void setCompressionType(String compressionType) {
		this.compressionType = compressionType;
	}

	public Class<?> getKeySerializer() {
		return keySerializer;
	}

	public void setKeySerializer(Class<?> keySerializer) {
		this.keySerializer = keySerializer;
	}

	public Class<?> getValueSerializer() {
		return valueSerializer;
	}

	public void setValueSerializer(Class<?> valueSerializer) {
		this.valueSerializer = valueSerializer;
	}

	public Integer getRetries() {
		return retries;
	}

	public void setRetries(Integer retries) {
		this.retries = retries;
	}
	
}
